ElasticsearchのIngest Nodeを試してみた

ElasticsearchのIngest Nodeを試してみた

Clock Icon2016.06.20

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

こんにちは、藤本です。

Elastic Stack 5もalpha3までリリースされました。GAリリースが待ち遠しいです。今回はElasticsearch 5の新機能の中でも注目している方が多いであろうIngest Nodeを触ってみましたので、ご紹介します。

公式ドキュメントでも既にIngest Nodeの情報は公開されています。

検証時点はバージョンalpha3です。GAでは変わっている可能性があることはご留意ください。

Ingest Node

Elasticsearch自体でインデックスする前にデータ変換/加工する機能となります。LogstashのFilter PluginやFluentdの一部機能をElasticsearch側で処理することができます。それによりLogstash/Fluentdをデータ変換/加工用に置いていた環境ではLogstash/Fluentdを取り除くことができ、サーバーコストや運用コストを省くことができます。

Ingest Nodeはノードタイプの一つとして加わります。ノード単位で設定が可能なため、Data Nodeと兼用させることも、パフォーマンスに不安を覚えるようであれば、専用のノードとして配置することも可能です。Elasticsearchはノード間でクラスタを構成を組むことができ、負荷分散、可用性向上を図ることができるので、SPOF、パフォーマンス過負荷になりがちな集約ノードを避けることができるのは嬉しいことです。

Elastic{ON}のIngest Nodeのスライドを以下に転載します。

ユースケース

Elastic社が2つのユースケースを紹介しています。

一つ目がFilebeatのパース処理です。Filebeatは軽量なデータシッパーであり、多くのデータ変換/加工処理を持ちません。詳しくは過去エントリ「Beatsのアウトプットを加工する」をご参照ください。このケースはFilebeatに限らず、Fluent Bitや省力なセンサーから出力されるデータなども該当します。多くの場合、システムのある役割を果たすサーバーやセンサーは収集データの加工をするためにリソースを割り当てているわけではありません。その加工処理を外出しすることで本来の処理に集中してもらいます。その加工処理をElasticsearchに委譲することができます。

もう一つがReindexによるインデックスの変換です。Elasticsearch 2.3系でリリースされたReindex APIと組み合わせて既存インデックスのデータを変換して、新しいインデックスを生成することができます。

利用方法

Ingest Nodeを利用するには2つの手順を踏む必要があります。

  1. (必要に応じて)Simulate API
  2. Ingest Pipelineの定義
  3. Pipelineを利用したAPI

簡単に書くとこんな感じです。

ingest-pipeline

Simulate API

Ingest APIはSimulate APIが実装されています。それによりドキュメントに対してどのような作用がもたらされるかを事前に検証することができます。Logstashをいざ動かしてみたけど想定通りじゃなかったということが私は多々あるので嬉しい機能です。実際に実装する前にSimulate APIで試すと後戻りも少なく設定することができます。またSimulate APIは既に作成済みのPipelineに対しても実施することができます。

Simulate APIのSyntaxは以下のようになります。詳細は後の工程で説明します。

POST _ingest/pipeline/_simulate
{
  "pipeline" : {
    // pipeline definition here
  },
  "docs" : [
    { /** first document **/ },
    { /** second document **/ },
    // ...
  ]
}

Ingest Pipelineの定義

Ingestを利用する場合、Pipelineという前処理を定義します。PipelineにProcessorというデータ変換処理を組み合わせることで受信するドキュメントを本来利用したい形式のドキュメントとして加工することができます。Pipelineが持つ設定はコメントと単数、もしくは複数のProcessorだけです。Processorは上から書いた順番で処理されます。

{
  "description" : "...",
  "processors" : [ ... ]
}

Processor

PipelineはProcessorといういくつかの変換/加工処理を持っています。LogstashでいうFilter Pluginと考えれば分かりやすいかと思います。現在、18のProccessorが標準実装されいます。

標準実装に加えてPluginという形で以下の2つのProcessorが提供されています。

Pipelineを利用したIndex API

Ingest Pipelineを利用する場合、Index APIに対してクエリストリングで定義したPipelineを指定します。

PUT my-index/my-type/my-id?pipeline=my_pipeline_id
{
  "foo": "bar"
}

やってみた

  • 動作環境
    • OS : CentOS 7
    • Elasticsearch : 5.0.0-alpha3

Processor

まずは上で紹介したいくつかのProcessorをSimulate APIを利用して試してみます。

Set Processor

Set Processorはドキュメントに対してフィールドを追加、既にフィールドがあれば、更新するProcessorです。動作として分かりやすいProcessorなのでこちらでまずはSimulate APIのSyntaxやPipelineの動作を見てもらえば、と思います。

### API
curl -XPOST "http://localhost:9200/_ingest/pipeline/_simulate" -d'
{
  "pipeline" : {
    "processors" : [
      {
        "set" : {
          "field" : "add_field",
          "value" : "add_value"
        }
      }
    ]
  },
  "docs" : [
    {
      "_source" : {
        "msg" : "simulate ingest api"
      }
    }
  ]
}'

### Output
{
  "docs": [
    {
      "doc": {
        "_id": "_id",
        "_type": "_type",
        "_index": "_index",
        "_source": {
          "msg": "simulate ingest api",
          "add_field": "add_value"
        },
        "_ingest": {
          "timestamp": "2016-06-18T03:26:05.332+0000"
        }
      }
    }
  ]
}

Simulate APIのSyntaxは大きくpipelinedocsに分かれます。pipelineには既に説明したPipelineの設定を記述します。docsにはPipelineに入力するドキュメントを記述します。

上記例では、msgフィールドしか持たないドキュメントを与えると、PipelineのSet Processorが処理されることで、add_fieldというフィールドが追加されたドキュメントに変換されました。

なんとなくIngest Node、Pipelineの挙動を分かりましたでしょうか?

Date Processor

次はDate Processorです。文字列の日付を多様なフォーマットで解釈することで、Elasticsearchのstrict_date_optional_timeフォーマットに変換します。入力フォーマットには任意なフォーマット(yyyy/MM/dd hh:mm:ss)や、標準実装されたフォーマット(ISO8601UNIXUNIX_MSTAI64N)を指定可能です。

### API
curl -XPOST "http://localhost:9200/_ingest/pipeline/_simulate" -d'
{
  "pipeline" : {
    "processors" : [
      {
        "date" : {
          "field" : "unixtimestamp",
          "target_field" : "timestamp",
          "formats" : ["UNIX_MS"],
          "timezone" : "Asia/Tokyo"
        }
      }
    ]
  },
  "docs" : [
    {
      "_source" : {
        "unixtimestamp" : "1466224357000"
      }
    }
  ]
}'

### Output
{
  "docs": [
    {
      "doc": {
        "_id": "_id",
        "_type": "_type",
        "_index": "_index",
        "_source": {
          "timestamp": "2016-06-18T13:32:37.000+09:00",
          "unixtimestamp": "1466224357000"
        },
        "_ingest": {
          "timestamp": "2016-06-18T04:29:13.864+0000"
        }
      }
    }
  ]
}

unixtimestampフィールドで与えたミリ秒をUNIXタイムスタンプとしてパースし、timestampフィールドをstrict_date_optional_timeフォーマットで追加しています。

Grok Processor

Grok ProcessorはLogstashのGrok Filter Plugin、FluentdのGrok Pluginと同等のProcessorです。文字列を指定した形式でパースできます。

### API
curl -XPOST "http://localhost:9200/_ingest/pipeline/_simulate" -d'
{
  "pipeline" : {
    "processors" : [
      {
        "grok" : {
          "field" : "msg",
          "patterns" : ["%{NUMBER:id:int}:%{WORD:company}"]
        }
      }
    ]
  },
  "docs" : [
    {
      "_source" : {
        "msg" : "1:classmethod"
      }
    }
  ]
}'

### Output
{
  "docs": [
    {
      "doc": {
        "_id": "_id",
        "_type": "_type",
        "_index": "_index",
        "_source": {
          "msg": "1:classmethod",
          "company": "classmethod",
          "id": 1
        },
        "_ingest": {
          "timestamp": "2016-06-18T07:27:24.404+0000"
        }
      }
    }
  ]
}

msgフィールドの文字列が:区切りでidcompanyそれぞれのフィールドに展開されました。

GeoIP Processor

GeoIP Processorは入力されたIPアドレスをGeoLite2データベースと照合し、取得した情報をフィールドに追加します。

Pluginインストール

ElasticsearchのPluginコマンドで簡単にインストールできます。

# /usr/share/elasticsearch/bin/elasticsearch-plugin install ingest-geoip
-> Downloading ingest-geoip from elastic
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@     WARNING: plugin requires additional permissions     @
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
* java.lang.RuntimePermission accessDeclaredMembers
See http://docs.oracle.com/javase/8/docs/technotes/guides/security/permissions.html
for descriptions of what these permissions allow and the associated risks.

Continue with installation? [y/N]y
-> Installed ingest-geoip
GeoIP Processor
### API
curl -XPOST "http://localhost:9200/_ingest/pipeline/_simulate" -d'
{
  "pipeline" : {
    "processors" : [
      {
        "geoip" : {
          "field" : "ip"
        }
      }
    ]
  },
  "docs" : [
    {
      "_source" : {
        "ip" : "8.8.8.8"
      }
    }
  ]
}'

### Output
{
  "docs": [
    {
      "doc": {
        "_id": "_id",
        "_type": "_type",
        "_index": "_index",
        "_source": {
          "geoip": {
            "continent_name": "North America",
            "city_name": "Mountain View",
            "country_iso_code": "US",
            "region_name": "California",
            "location": {
              "lon": -122.0838,
              "lat": 37.386
            }
          },
          "ip": "8.8.8.8"
        },
        "_ingest": {
          "timestamp": "2016-06-18T07:46:42.858+0000"
        }
      }
    }
  ]
}

Apacheのaccess_logを解析する

次に複数のProcessorを組み合わせて、Apacheのaccess_logをKibanaで可視化しやすいようにパースするPipelineを構成します。ここでは細かい説明はしませんが、Grokによりパースされ、Dateにより日付が変換され、GeoIPによりIPアドレスを解析され、Removeにより入力フィールド、変換前の日付フィールドを削除されました。processorsに記載した処理が上から順に処理されていることが分かります。?verboseのクエリストリングをAPIに与えることで各Processorの処理結果によりどのようにドキュメントが変化されるのかをトレースすることができます。

### API
curl -XPOST "http://localhost:9200/_ingest/pipeline/_simulate" -d'
{
  "pipeline" : {
    "processors" : [
      {
        "grok" : {
          "field" : "message",
          "patterns" : ["%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \\[%{HTTPDATE:timestamp}\\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}"]
        }
      },
      {
        "date" : {
          "field" : "timestamp",
          "formats" : ["dd/MMM/YYYY:HH:mm:ss Z"]
        }
      },
      {
        "geoip" : {
          "field" : "clientip"
        }
      },
      {
        "remove" : {
          "field" : "message"
        }
      },
      {
        "remove" : {
          "field" : "timestamp"
        }
      }
    ]
  },
  "docs" : [
    {
      "_source" : {
        "message" : "83.149.9.216 - - [20/Sep/2015:15:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png HTTP/1.1\" 200 171717 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\""
      }
    }
  ]
}'

### Output
{
  "docs": [
    {
      "doc": {
        "_id": "_id",
        "_type": "_type",
        "_index": "_index",
        "_source": {
          "request": "/presentations/logstash-monitorama-2013/images/kibana-dashboard3.png",
          "agent": "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
          "geoip": {
            "continent_name": "Europe",
            "city_name": "Moscow",
            "country_iso_code": "RU",
            "region_name": "Moscow",
            "location": {
              "lon": 37.6156,
              "lat": 55.7522
            }
          },
          "auth": "-",
          "ident": "-",
          "verb": "GET",
          "referrer": "\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"",
          "@timestamp": "2015-09-20T15:13:42.000Z",
          "response": 200,
          "bytes": 171717,
          "clientip": "83.149.9.216",
          "httpversion": "1.1"
        },
        "_ingest": {
          "timestamp": "2016-06-18T08:06:37.881+0000"
        }
      }
    }
  ]
}

それでは実際にPipelineを作成して、access_logをインデックスしてみましょう。

Pipeline作成

先ほどSimulate APIで利用したPipelineを作成します。

### API
curl -XPUT "http://localhost:9200/_ingest/pipeline/access_log_pipeline" -d'
{
  "processors" : [
    {
      "grok" : {
:(省略)
    }
  ]
}'

### Output
{"acknowledged": true}

作成したPipelineを確認します。

### API
curl -XGET "http://localhost:9200/_ingest/pipeline/access_log_pipeline"

### Output
{
  "pipelines": [
    {
      "id": "access_log_pipeline",
      "config": {
        "processors": [
:(省略)
        ]
      }
    }
  ]
}

Pipelineを利用したIndex API

### API
curl -XPOST "http://localhost:9200/accesslog-2016.06.19/log?pipeline=access_log_pipeline" -d'
{
  "message" : "83.149.9.216 - - [20/Sep/2015:15:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png HTTP/1.1\" 200 171717 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\""
}'

### Output
curl -XPOST "http://localhost:9200/accesslog-2016.06.19/log?pipeline=access_log_pipeline" -d'
{
  "message" : "83.149.9.216 - - [20/Sep/2015:15:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png HTTP/1.1\" 200 171717 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\""
}'

確認します。

### API
curl -XGET "http://localhost:9200/accesslog-2016.06.19/log/AVVmXxjaMtSwPwddbzoO"

### Output
{
  "_index": "accesslog-2016.06.19",
  "_type": "log",
  "_id": "AVVmXxjaMtSwPwddbzoO",
  "_version": 1,
  "found": true,
  "_source": {
    "request": "/presentations/logstash-monitorama-2013/images/kibana-dashboard3.png",
    "agent": "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
    "geoip": {
      "continent_name": "Europe",
      "city_name": "Moscow",
      "country_iso_code": "RU",
      "region_name": "Moscow",
      "location": {
        "lon": 37.6156,
        "lat": 55.7522
      }
    },
    "auth": "-",
    "ident": "-",
    "verb": "GET",
    "referrer": "\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"",
    "@timestamp": "2015-09-20T15:13:42.000Z",
    "response": 200,
    "bytes": 171717,
    "clientip": "83.149.9.216",
    "httpversion": "1.1"
  }
}

Simulate APIと同じようにドキュメントが変換されて、インデックスされていることが分かります。

まとめ

いかがでしたでしょうか?

  • Logstashを利用したことがある方であれば、設定は簡単
  • 小規模環境は導入の敷居は低そう。大規模環境は専用のIngest Nodeを立てるなど構成を検討する必要がある
  • Processorの種類があまり多くない。Pluginにより実装可能

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.